Skip to main content

Overview

The advanced_metrics_processor.py script enriches the base analysis with advanced metrics derived from historical OHLCV (Open, High, Low, Close, Volume) data. It processes CSV files containing price and volume data to calculate technical metrics that require time series analysis.

Purpose

This script adds sophisticated technical analysis metrics including:
  • Volume analysis (RVOL, turnover, EMA volume)
  • Volatility metrics (ADR - Average Daily Range)
  • Price benchmarks (ATH, 52-week low, historical returns)
  • Gap analysis and intraday range calculations
  • Circuit limit mapping

Input Files Required

all_stocks_fundamental_analysis.json
JSON
required
Base analysis file generated by bulk_market_analyzer.py. This is both input and output.
complete_price_bands.json
JSON
required
NSE price band (circuit limit) data containing current circuit limit percentages for each symbol.
ohlcv_data/
Directory
required
Directory containing individual CSV files for each stock with daily OHLCV data. Files named as {SYMBOL}.csv.

OHLCV CSV Format

Each CSV file should contain:
Date,Open,High,Low,Close,Volume
2024-01-01,100.50,102.30,99.80,101.20,1500000
2024-01-02,101.50,103.00,101.00,102.50,1800000

Output Produced

all_stocks_fundamental_analysis.json
JSON
Updates the master analysis file in-place by adding/updating advanced metric fields for each stock.

Processing Logic

1. EMA Calculation

Implements exponential moving average calculation:
def calculate_ema(series, periods):
    return series.ewm(span=periods, adjust=False).mean()

2. Per-Symbol CSV Processing

Each stock’s OHLCV file is processed independently using concurrent execution:
def process_symbol_csv(csv_path):
    sym = os.path.basename(csv_path).replace(".csv", "")
    df = pd.read_csv(csv_path)
    
    # Ensure numeric columns
    for col in ['Open', 'High', 'Low', 'Close', 'Volume']:
        df[col] = pd.to_numeric(df[col], errors='coerce')
    
    df = df.dropna()
    if df.empty: return sym, None
    
    # Get latest and previous rows
    latest = df.iloc[-1]
    prev = df.iloc[-2] if len(df) > 1 else latest

3. All-Time High (ATH) Calculation

With hybrid fix for live price integration:
# In process_symbol_csv
ath = df['High'].max()
pct_from_ath = ((ath - latest['Close']) / ath) * 100 if ath > 0 else 0

# In main() - Hybrid Fix to eliminate 1-day lag
live_ltp = pd.to_numeric(stock.get("Ltp"), errors='coerce')
if pd.notnull(live_ltp) and live_ltp > 0:
    ath = metrics.get("ATH_Value", 0)
    if ath > 0:
        metrics["% from ATH"] = round(((ath - live_ltp) / ath) * 100, 2)

4. Gap Analysis

Calculates opening gap relative to previous close:
gap_up_pct = ((latest['Open'] - prev['Close']) / prev['Close']) * 100 if prev['Close'] > 0 else 0
day_range_pct = ((latest['High'] - latest['Low']) / latest['Low']) * 100 if latest['Low'] > 0 else 0

5. Average Daily Range (ADR) Calculation

Calculates volatility across multiple timeframes:
# Calculate daily range percentage for each day
df['Daily_Range_Pct'] = ((df['High'] - df['Low']) / df['Low']) * 100

# Moving averages of ADR
adr_5 = df['Daily_Range_Pct'].tail(5).mean()
adr_14 = df['Daily_Range_Pct'].tail(14).mean()
adr_20 = df['Daily_Range_Pct'].tail(20).mean()
adr_30 = df['Daily_Range_Pct'].tail(30).mean()

6. Multi-Period Returns

Calculates historical returns over various lookback periods:
# 6 Month Return (~126 trading days)
price_6m_ago = df['Close'].iloc[-126] if len(df) >= 126 else df['Close'].iloc[0]
returns_6m = ((latest['Close'] - price_6m_ago) / price_6m_ago) * 100

# 52W Low Distance (~252 trading days)
low_52w = df['Low'].tail(252).min()
pct_from_52w_low = ((latest['Close'] - low_52w) / low_52w) * 100 if low_52w > 0 else 0

7. Volume Metrics Processing

# Turnover in Crores
df['Turnover_Cr'] = (df['Close'] * df['Volume']) / 10000000
avg_rupee_vol_30 = df['Turnover_Cr'].tail(30).mean()

# Relative Volume (RVOL)
avg_vol_20 = df['Volume'].tail(21).iloc[:-1].mean()  # Exclude latest day
rvol = latest['Volume'] / avg_vol_20 if avg_vol_20 > 0 else 0

# 200-day EMA Volume
df['EMA_Vol_200'] = calculate_ema(df['Volume'], 200)
ema_vol_200_latest = df['EMA_Vol_200'].iloc[-1]

# Distance from 52W High of EMA Volume
ema_vol_200_52w_high = df['EMA_Vol_200'].tail(252).max()
pct_from_ema_200_52w_high = ((ema_vol_200_latest - ema_vol_200_52w_high) / ema_vol_200_52w_high) * 100

8. Turnover Moving Averages

turnover_20 = df['Turnover_Cr'].tail(20).mean()
turnover_50 = df['Turnover_Cr'].tail(50).mean()
turnover_100 = df['Turnover_Cr'].tail(100).mean()

9. Parallel Processing

Uses ThreadPoolExecutor for efficient batch processing:
advanced_metrics_map = {}
with ThreadPoolExecutor(max_workers=10) as executor:
    futures = [executor.submit(process_symbol_csv, cf) for cf in csv_files]
    for future in futures:
        sym, result = future.result()
        if result:
            advanced_metrics_map[sym] = result

10. Circuit Limit Integration

# Load price bands
price_band_map = {}
with open(PRICE_BANDS_FILE, "r") as f:
    pb_data = json.load(f)
    for item in pb_data:
        price_band_map[item.get("Symbol")] = item.get("Band")

# Update master data
for stock in base_data:
    sym = stock.get("Symbol")
    if sym in price_band_map:
        stock["Circuit Limit"] = price_band_map[sym]

Fields Added/Modified

This script adds/updates the following fields in the master JSON:

Volume Metrics

  • 30 Days Average Rupee Volume(Cr.): Average daily turnover over 30 days
  • RVOL: Relative volume (current volume / 20-day average volume)
  • Daily Rupee Turnover 20(Cr.): 20-day moving average of daily turnover
  • Daily Rupee Turnover 50(Cr.): 50-day moving average of daily turnover
  • Daily Rupee Turnover 100(Cr.): 100-day moving average of daily turnover
  • 200 Days EMA Volume: 200-period exponential moving average of volume
  • % from 52W High 200 Days EMA Volume: Distance from 52-week high of EMA volume

Volatility Metrics

  • 5 Days MA ADR(%): 5-day moving average of average daily range
  • 14 Days MA ADR(%): 14-day moving average of average daily range
  • 20 Days MA ADR(%): 20-day moving average of average daily range
  • 30 Days MA ADR(%): 30-day moving average of average daily range
  • Day Range(%): Intraday high-low range as percentage

Price Benchmarks

  • % from ATH: Distance from all-time high (with live price correction)
  • Gap Up %: Opening gap from previous close (replaces placeholder)
  • 6 Month Returns(%): Price return over 6 months (~126 trading days)
  • % from 52W Low: Distance from 52-week low

Market Structure

  • Circuit Limit: NSE circuit limit percentage (price band)

Code Example

advanced_metrics_processor.py
import pandas as pd
import json
import os
import glob
from concurrent.futures import ThreadPoolExecutor

def calculate_ema(series, periods):
    return series.ewm(span=periods, adjust=False).mean()

def process_symbol_csv(csv_path):
    sym = os.path.basename(csv_path).replace(".csv", "")
    df = pd.read_csv(csv_path)
    
    # Calculate ATH
    ath = df['High'].max()
    pct_from_ath = ((ath - latest['Close']) / ath) * 100
    
    # Calculate ADR
    df['Daily_Range_Pct'] = ((df['High'] - df['Low']) / df['Low']) * 100
    adr_20 = df['Daily_Range_Pct'].tail(20).mean()
    
    return sym, {"% from ATH": round(pct_from_ath, 2), ...}

def main():
    # Load base data
    with open(JSON_INPUT, "r") as f:
        base_data = json.load(f)
    
    # Process OHLCV files in parallel
    csv_files = glob.glob(os.path.join(OHLCV_DIR, "*.csv"))
    with ThreadPoolExecutor(max_workers=10) as executor:
        futures = [executor.submit(process_symbol_csv, cf) for cf in csv_files]
        for future in futures:
            sym, result = future.result()
            if result:
                advanced_metrics_map[sym] = result
    
    # Update master JSON
    for stock in base_data:
        if sym in advanced_metrics_map:
            stock.update(advanced_metrics_map[sym])
    
    with open(JSON_OUTPUT, "w") as f:
        json.dump(base_data, f, indent=4)

Function Reference

calculate_ema(series, periods)

Calculates exponential moving average using pandas. Parameters:
  • series: Pandas Series of numeric values
  • periods: Number of periods for EMA calculation
Returns: Pandas Series containing EMA values

process_symbol_csv(csv_path)

Processes a single stock’s OHLCV CSV file and calculates all advanced metrics. Parameters:
  • csv_path: Full path to the CSV file
Returns: Tuple of (symbol, metrics_dict) or (symbol, None) if processing fails

main()

Orchestrates the entire processing pipeline including loading data, parallel processing, and updating the master JSON. Returns: None (writes output to JSON file)

Performance Notes

  • Parallel Processing: Uses ThreadPoolExecutor with 10 workers for concurrent CSV processing
  • Processing Time: ~2,000 stocks processed in 10-20 seconds
  • Memory Efficiency: Processes one CSV at a time per thread
  • Error Handling: Gracefully handles missing/corrupt CSV files
  • Hybrid Fix: Eliminates 1-day lag in ATH calculation by using live LTP when available

Dependencies

  • pandas: DataFrame operations and EMA calculations
  • json: JSON file handling
  • os: File path operations
  • glob: File pattern matching
  • concurrent.futures: Parallel processing

Important Notes

  1. Dependency: Must run after bulk_market_analyzer.py
  2. In-Place Update: Modifies the master JSON file directly
  3. Data Freshness: ATH calculation uses hybrid approach combining historical data with live prices
  4. Trading Days: Assumes ~252 trading days per year, ~126 for 6 months
  5. Turnover Calculation: Uses divisor of 10,000,000 to convert to crores

Source File Location

advanced_metrics_processor.py:1-175